This example shows how to create an OPC Unified Architecture data event source, and query it for events carrying even data value.
The main program:
// This example shows how to create an OPC Unified Architecture data event source, and query it for events carrying even data
// value.
using System;
using System.Diagnostics;
using System.Reactive;
using System.ServiceModel;
using Microsoft.ComplexEventProcessing;
using Microsoft.ComplexEventProcessing.Linq;
using Microsoft.ComplexEventProcessing.ManagementService;
using OpcLabs.EasyOpc.UA.ComplexEventProcessing;
using OpcLabs.EasyOpc.UA.Reactive;
namespace SimpleUAStreamInsightApplication
class Program
static void Main()
// Create an embedded StreamInsight server
//using (var server = Server.Create("Default"))
using (var server = Server.Create("Instance1"))
Debug.Assert(server != null);
// Create a local end point for the server embedded in this program
var managementService = server.CreateManagementService();
Debug.Assert(managementService != null);
var host = new ServiceHost(managementService);
host.AddServiceEndpoint(typeof(IManagementService), new WSHttpBinding(SecurityMode.Message),
/* The following entities will be defined and available in the server for other clients:
* serverApp
* serverSource
* serverSink
* serverProcess
// CREATE a StreamInsight APPLICATION in the server
var myApp = server.CreateApplication("serverApp");
// DEFINE a simple SOURCE (returns a point event every second)
var observable = UADataChangeNotificationObservable.Create<int>(
"nsu= ;i=11017", // Data/Dynamic/UserScalar/Int32Value
var mySource = myApp
.DefineObservable(() => observable)
eventArgs => PointEvent.CreateInsert(
// DEPLOY the source to the server for clients to use
// Compose a QUERY over the source (return every event carrying even data value)
var myQuery = from e in mySource
where e.AttributeDataPayload.Value % 2 == 0
select e;
// DEFINE a simple observer SINK (writes the value to the server console)
var mySink = myApp.DefineObserver(() => Observer.Create<UADataChangeNotificationPayload<int>>(
payload => Console.WriteLine("sink_Server..: {0}", payload)));
// DEPLOY the sink to the server for clients to use
// BIND the query to the sink and RUN it
var binding = myQuery.Bind(mySink);
Debug.Assert(binding != null);
using (/*var proc = */binding.Run("serverProcess"))
// Wait for the user stops the server
Console.WriteLine("MyStreamInsightServer is running, press Enter to stop the server");
Console.WriteLine(" ");
See Also